home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Windows Expert
/
Windows Expert.iso
/
windownt
/
queue.zip
/
QUEUE
/
QUEUE.C
< prev
next >
Wrap
C/C++ Source or Header
|
1992-10-11
|
16KB
|
668 lines
/************************************************************************\
* The enclosed files, "the software," is provided by
* Microsoft Corporation "as is" without warranty of any kind.
* MICROSOFT DISCLAIMS ALL WARRANTIES, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. You assume all risks of
* using the software.
*
* The software is Copyright (c) 1992 Microsoft Corporation.
* Original Author: John M. Hall, Microsoft SDE 9/1/92
*
* You are granted the right to freely distribute this software.
* You are granted the right to make changes provided this comment block
* is retained without modification and you acknowledge the changes.
*
\************************************************************************/
/************************************************************************\
*
* MODULE: QUEUE.C
*
* PURPOSE: Simple shared memory management.
*
* FUNCTIONS: QueueEntry() - DLL entry point
*
*
*
* COMMENTS:
*
************************************************************************/
#define QUEUE_DLL_C
#include <stdio.h>
#include <memory.h>
#include <windows.h>
#include <winbase.h>
#include <string.h>
#include <malloc.h>
#define DEBUG
#define DOSWIN32
#include "Queue.h"
/************************************************************************\
*
* FUNCTION: QueueEntry
*
* INPUTS: hDLL - handle of DLL
* dwReason - indicates why DLL called
* lpReserved - reserved
*
* RETURNS: TRUE for success, FALSE on error
*
* Note that the retuRn value is used only when
* dwReason = DLL_PROCESS_ATTACH.
*
* GLOBAL VARS: See Queue.h for list of global variables.
*
* COMMENTS: Modified from simple dll sample
*
\************************************************************************/
void queue_cleanup()
{
return;
}
BOOL QueueEntry (HANDLE hDLL, DWORD dwReason, LPVOID lpReserved)
{
switch (dwReason)
{
case DLL_PROCESS_ATTACH:
{
/******************************************************************\
* DLL is attaching to the address space of the current process.
\******************************************************************/
BOOL bFlag;
fp_log = fopen( "hp_shr.log", "a+");
//
// Attach control memory for Queues
//
bFlag = attach_ctl();
if (!bFlag)
fclose(fp_log);
AssertBox( bFlag, "Attach Memory");
return(bFlag);
break;
}
case DLL_THREAD_ATTACH:
/******************************************************************\
* A new thread is being created in the current process.
\******************************************************************/
break;
case DLL_THREAD_DETACH:
/******************************************************************\
* A thread is exiting cleanly.
\******************************************************************/
break;
case DLL_PROCESS_DETACH:
/******************************************************************\
* The calling process is detaching the DLL from its address space.
\******************************************************************/
queue_cleanup();
fclose(fp_log);
UnmapViewOfFile(lpCtrlBase);
CloseHandle(hControl);
break;
}
return TRUE;
UNREFERENCED_PARAMETER(hDLL);
UNREFERENCED_PARAMETER(lpReserved);
}
VOID MyAssertBox( const char *title, const char *exp,
const char *file, int line)
{
char buffer[512];
sprintf( buffer, "Assert(%s) at (%s::%d)", exp, file, line);
MessageBox (NULL, title, buffer, MB_OK | MB_ICONHAND);
}
BOOL attach_ctl()
{
PQUEUE_DATA pqd;
int ii;
SetErrorMode(SEM_FAILCRITICALERRORS);
hGlobalSem = CreateSemaphore( NULL, 0, 1, "JMH_QUEUE_SEM");
if (hGlobalSem)
{
//
// This is correct, we use GetLastError() to find out how the
// function succeeded!
//
if (GetLastError() == ERROR_ALREADY_EXISTS)
{
//
// We didn't create it, just attach to it so we need to
// wait till its free.
//
WaitForSingleObject(hGlobalSem, INFINITE);
dwProcesses++;
}
else
{
dwProcesses = 1;
}
//
// The heart of the matter, create the memory mapped files.
//
hControl = CreateFileMapping(
(HANDLE) INFINITE,
NULL,
PAGE_READWRITE,
0,
64 * 1024,
"JMH_QUEUE_CTRL");
if (hControl == NULL)
fprintf( fp_log, "CreateFileMapping Failed\n");
//
// Now attach the memory to the correct addresses
//
lpCtrlBase = MapViewOfFile( hControl, FILE_MAP_WRITE, 0, 0, 0);
if (lpCtrlBase == NULL)
{
fprintf( fp_log, "MapViewOfFile Failed\n");
ReleaseSemaphore(hGlobalSem, 1, NULL);
CloseHandle(hControl);
return(FALSE);
}
//
// First time initialization.
//
if (dwProcesses == 1)
{
iQueues = 0;
pqd = (PQUEUE_DATA) lpCtrlBase;
for (ii = 0; ii < MAX_QUEUES; ii++ )
{
pqd[ii].aQueueName = 0;
pqd[ii].sElements = 0;
}
}
ReleaseSemaphore(hGlobalSem, 1, NULL);
return(TRUE);
}
else
return (FALSE);
}
BOOL IsValidHqueue( HQUEUE hq)
{
if (IsBadReadPtr(hq, sizeof(HQUEUE)) == TRUE)
return(FALSE);
if (hq->dwMagic != MAGIC)
return(FALSE);
return(TRUE);
}
DWORD CreateQueue( PHQUEUE phq, int fQueueOrder, LPTSTR pszName)
{
HQUEUE hq;
LPTSTR pszBase;
int cb;
HANDLE hEvent;
HANDLE hSem;
int iIndex = -1;
PQUEUE_DATA pqd;
int ii;
ATOM atom;
if (strncmp(pszName, "\\queues\\", 8) == 0)
pszName += 8;
if ((cb = strlen(pszName)) == 0)
{
return(QUE_INVALID_NAME);
}
if (strchr(pszName, '\\') != NULL)
{
return(QUE_INVALID_NAME);
}
if ((atom = GlobalAddAtom(pszName)) == 0)
{
return(GetLastError());
}
WaitForSingleObject(hGlobalSem, INFINITE);
if (iQueues == QUEUE_LIMIT)
{
ReleaseSemaphore(hGlobalSem, 1, NULL);
GlobalDeleteAtom(atom);
return( QUE_NO_MEMORY);
}
pqd = (PQUEUE_DATA) lpCtrlBase;
ii = 0;
while (iIndex == -1 && ii < QUEUE_LIMIT)
{
if (pqd[ii].aQueueName == 0)
iIndex = ii;
else
ii++;
}
if (iIndex == -1)
{
GlobalDeleteAtom(atom);
ReleaseSemaphore(hGlobalSem, 1, NULL);
return( QUE_NO_MEMORY);
}
pszBase = malloc(cb + 6);
strcpy( pszBase, pszName);
strcat( pszBase, "_Event");
hEvent = CreateEvent( NULL, TRUE, FALSE, pszBase);
if (hEvent == NULL)
{
ReleaseSemaphore(hGlobalSem, 1, NULL);
GlobalDeleteAtom(atom);
free(pszBase);
return(GetLastError());
}
strcpy( pszBase, pszName);
strcat( pszBase, "_Sem");
hSem = CreateSemaphore( NULL, 1, 1, pszBase);
if (hSem == NULL)
{
ReleaseSemaphore(hGlobalSem, 1, NULL);
GlobalDeleteAtom(atom);
free(pszBase);
CloseHandle(hEvent);
return(GetLastError());
}
hq = (HQUEUE) malloc(sizeof(QUEUE));
if (!hq)
{
ReleaseSemaphore(hGlobalSem, 1, NULL);
return( QUE_NO_MEMORY);
}
hq->dwMagic = MAGIC;
hq->hEvent = hEvent;
hq->hSem = hSem;
hq->iIndex = iIndex;
hq->dwProcId = GetCurrentProcessId();
pqd[iIndex].wType = fQueueOrder;
pqd[iIndex].dwCreator = hq->dwProcId;
pqd[iIndex].aQueueName = atom;
pqd[iIndex].sElements = 0;
*phq = hq;
ReleaseSemaphore(hGlobalSem, 1, NULL);
return(0);
}
DWORD CloseQueue( HQUEUE hq)
{
PQUEUE_DATA pqd;
int iIndex;
int ii;
DWORD dwProcId;
HANDLE hSem;
if (!IsValidHqueue( hq))
return(QUE_INVALID_HANDLE);
WaitForSingleObject(hSem = hq->hSem, INFINITE);
iIndex = hq->iIndex;
pqd = (PQUEUE_DATA) lpCtrlBase;
dwProcId = hq->dwProcId;
CloseHandle(hq->hEvent);
free(hq);
if (pqd[iIndex].aQueueName != 0)
GlobalDeleteAtom(pqd[iIndex].aQueueName);
if (dwProcId != pqd[iIndex].dwCreator)
{
ReleaseSemaphore(hSem, 1, NULL);
CloseHandle(hSem);
return(0);
}
for (ii = 0; ii < pqd[iIndex].sElements ; ii++)
{
ShrFree(pqd[iIndex].elem[ii].dwShrHandle);
}
pqd[iIndex].aQueueName = 0;
pqd[iIndex].dwCreator = 0;
ReleaseSemaphore(hSem, 1, NULL);
CloseHandle(hSem);
return(0);
}
DWORD OpenQueue(LPDWORD ppid, PHQUEUE phq, LPTSTR pszName)
{
HQUEUE hq;
LPTSTR pszBase;
int cb;
HANDLE hEvent;
HANDLE hSem;
int iIndex = -1;
PQUEUE_DATA pqd;
int ii;
ATOM atom;
pqd = (PQUEUE_DATA) lpCtrlBase;
if (strncmp(pszName, "\\queues\\", 8) == 0)
pszName += 8;
if ((cb = strlen(pszName)) == 0)
return(QUE_INVALID_NAME);
if (strchr(pszName, '\\') != NULL)
return(QUE_INVALID_NAME);
if ((atom = GlobalAddAtom(pszName)) == 0)
return(GetLastError());
hq = (HQUEUE) malloc(sizeof(QUEUE));
if (!hq)
return(QUE_NO_MEMORY);
WaitForSingleObject(hGlobalSem, INFINITE);
ii = 0;
while (iIndex == -1 && ii < QUEUE_LIMIT)
{
if (pqd[ii].aQueueName == atom)
iIndex = ii;
else
ii++;
}
if (iIndex == -1)
{
free(hq);
ReleaseSemaphore(hGlobalSem, 1, NULL);
return( QUE_NAME_NOT_EXIST);
}
pszBase = malloc(cb + 6);
strcpy( pszBase, pszName);
strcat( pszBase, "_Event");
hEvent = CreateEvent( NULL, TRUE, FALSE, pszBase);
if (hEvent == NULL)
{
free(hq);
GlobalDeleteAtom(atom);
free(pszBase);
return(GetLastError());
}
strcpy( pszBase, pszName);
strcat( pszBase, "_Sem");
hSem = CreateSemaphore( NULL, 1, 1, pszBase);
if (hSem == NULL)
{
free(hq);
GlobalDeleteAtom(atom);
free(pszBase);
CloseHandle(hEvent);
return(GetLastError());
}
hq->dwMagic = MAGIC;
hq->hEvent = hEvent;
hq->hSem = hSem;
hq->iIndex = iIndex;
hq->dwProcId = GetCurrentProcessId();
*ppid = pqd[iIndex].dwCreator;
*phq = hq;
ReleaseSemaphore(hGlobalSem, 1, NULL);
return(0);
}
void push_down( int iIndex, int jj)
{
PQUEUE_DATA pqd;
int ii;
pqd = (PQUEUE_DATA) lpCtrlBase;
for (ii = pqd[iIndex].sElements; ii > jj; ii--)
pqd[iIndex].elem[ii] = pqd[iIndex].elem[ii -1];
}
void push_up( int iIndex, int jj)
{
PQUEUE_DATA pqd;
int ii;
pqd = (PQUEUE_DATA) lpCtrlBase;
for (ii = jj; ii < pqd[iIndex].sElements; ii++)
pqd[iIndex].elem[ii] = pqd[iIndex].elem[ii +1];
}
DWORD WriteQueue(HQUEUE hq, DWORD dwEventCode, DWORD dwShrHandle, DWORD dwPriority)
{
int iIndex;
PQUEUE_DATA pqd;
int ii;
if (!IsValidHqueue( hq))
return(QUE_INVALID_HANDLE);
WaitForSingleObject( hq->hSem, INFINITE);
iIndex = hq->iIndex;
pqd = (PQUEUE_DATA) lpCtrlBase;
if (pqd[iIndex].sElements == QUEUE_LIMIT)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
return(QUE_NO_MEMORY);
}
if (pqd[iIndex].sElements == 0)
ii = 0;
else
{
if (pqd[iIndex].wType == QUE_FIFO)
ii = pqd[iIndex].sElements;
if (pqd[iIndex].wType == QUE_LIFO)
{
push_down(iIndex, 0);
ii = 0;
}
if (pqd[iIndex].wType == QUE_PRIORITY)
{
ii = 0;
while (ii < pqd[iIndex].sElements &&
dwPriority < pqd[iIndex].elem[ii].dwPriority)
ii++;
if (ii != pqd[iIndex].sElements)
push_down( iIndex, ii);
}
}
pqd[iIndex].elem[ii].dwWriterId = hq->dwProcId;
pqd[iIndex].elem[ii].dwShrHandle = dwShrHandle;
pqd[iIndex].elem[ii].dwEventCode = dwEventCode;
pqd[iIndex].elem[ii].dwPriority = dwPriority;
pqd[iIndex].sElements++;
SetEvent(hq->hEvent);
ReleaseSemaphore(hq->hSem, 1, NULL);
return(0);
}
DWORD ReadQueue(HQUEUE hq, PQ_ELEMENT pqe, int iElement, BOOL fWait)
{
int iIndex;
PQUEUE_DATA pqd;
if (!IsValidHqueue( hq))
return(QUE_INVALID_HANDLE);
WaitForSingleObject(hq->hSem, INFINITE);
iIndex = hq->iIndex;
pqd = (PQUEUE_DATA) lpCtrlBase;
if (hq->dwProcId != pqd[iIndex].dwCreator)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
return(QUE_INVALID_HANDLE);
}
if (iElement < 0)
iElement = 0;
if (iElement >= pqd[iIndex].sElements && iElement != 0)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
return(QUE_ELEMENT_NOT_EXIST);
}
if (pqd[iIndex].sElements == 0)
if (fWait)
{
while (pqd[iIndex].sElements == 0)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
WaitForSingleObject(hq->hEvent, INFINITE);
WaitForSingleObject(hq->hSem, INFINITE);
}
}
else
return(QUE_EMPTY);
*pqe = pqd[iIndex].elem[iElement];
push_up(iIndex, iElement);
pqd[iIndex].sElements --;
if (pqd[iIndex].sElements == 0)
ResetEvent(hq->hEvent);
ReleaseSemaphore(hq->hSem, 1, NULL);
return(0);
}
HANDLE GetQueueEventHandle( HQUEUE hq)
{
if (hq->dwMagic != MAGIC)
return(0);
return( hq->hEvent);
}
DWORD PeekQueue(HQUEUE hq, PQ_ELEMENT pqe, int *piElement, BOOL fWait)
{
int iIndex;
PQUEUE_DATA pqd;
if (!IsValidHqueue( hq))
return(QUE_INVALID_HANDLE);
WaitForSingleObject(hq->hSem, INFINITE);
iIndex = hq->iIndex;
pqd = (PQUEUE_DATA) lpCtrlBase;
if (hq->dwProcId != pqd[iIndex].dwCreator)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
return(QUE_INVALID_HANDLE);
}
if (*piElement < 0)
*piElement = 0;
else
(*piElement)++;
if (*piElement >= pqd[iIndex].sElements && *piElement != 0)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
return(QUE_ELEMENT_NOT_EXIST);
}
if (pqd[iIndex].sElements == 0)
if (fWait)
{
while (pqd[iIndex].sElements == 0)
{
ReleaseSemaphore(hq->hSem, 1, NULL);
WaitForSingleObject(hq->hEvent, INFINITE);
WaitForSingleObject(hq->hSem, INFINITE);
}
}
else
return(QUE_EMPTY);
*pqe = pqd[iIndex].elem[*piElement];
ReleaseSemaphore(hq->hSem, 1, NULL);
return(0);
}